package org.msgcenter.mc.actor;

import cn.hutool.core.lang.TypeReference;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpStatus;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.Json;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import lombok.extern.slf4j.Slf4j;
import org.msgcenter.mc.common.JsonResult;
import org.msgcenter.mc.ds.CompanyWechatMsg;
import org.msgcenter.mc.ds.TurnOnOffMsg;

import java.util.function.BiConsumer;
import java.util.function.Consumer;

@Slf4j
public class MsgToCompanyWechatActor extends AbstractVerticle {

  private EventBus eb;

  private boolean pushOnOff = true;

  private WebClient client;

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    super.start(startPromise);

    eb = vertx.eventBus();
    client = WebClient.create(vertx);

    onCompanyWechat();
  }

  @Override
  public void stop(Promise<Void> stopPromise) throws Exception {
    super.stop(stopPromise);
  }

  private void onCompanyWechat() {
    MessageConsumer<String> consumer = eb.consumer("service.push_msg.company_wechat_msg");

    BiConsumer<String, CompanyWechatMsg> errHandler = (errLog, cwm) -> {
      JsonResult<String, CompanyWechatMsg> jr = new JsonResult<>(HttpStatus.HTTP_INTERNAL_ERROR, errLog, cwm);
      eb.publish("service.push_msg.company_wechat_msg.err", JSONUtil.toJsonStr(jr));
    };

    Consumer<CompanyWechatMsg> okHandler = cmw -> {
      JsonResult<String, CompanyWechatMsg> jr = new JsonResult<>(HttpStatus.HTTP_OK, "", cmw);
      eb.publish("service.push_msg.company_wechat_msg.ok", JSONUtil.toJsonStr(jr));
    };

    consumer.handler(payload -> {
        CompanyWechatMsg msg = JSONUtil.toBean(payload.body(), new TypeReference<>() {
        }, true);

        // 对于站点消息 保存到相关数据表即可
        if (pushOnOff) {
          log.info("发送企业消息到数据库 {}", msg);
          Future<Message<Object>> dbAction = eb.request("db.post.add_company_wechat_msg", JSONUtil.toJsonStr(msg));

          String data = msg.getMsgContent();
          Buffer dataBuffer = Buffer.buffer(data);

          Future<HttpResponse<Buffer>> webReqAction = client
            .postAbs("https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=bd7f4f4b-1af0-4f61-bd14-02dd8c5e41da")
            .sendBuffer(dataBuffer)
            .onSuccess(res -> log.info("执行企业微信发送成功 {}", res.bodyAsString()))
            .onFailure(err -> log.error("执行企业微信发送错误 {}", err.getMessage()));

          CompositeFuture.all(dbAction, webReqAction)
            .onSuccess(ok -> okHandler.accept(msg))
            .onFailure(err -> errHandler.accept(StrUtil.format("发送企业消息出现错误 {}", err.getMessage()), msg));

        } else {
          String errLog = StrUtil.format("发送企业消息开关关闭 暂不发送消息 {}", msg);
          log.info(errLog);

          errHandler.accept(errLog, msg);
        }
      })
      .exceptionHandler(err -> log.error("发送企业消息出现错误 {}", err.getMessage()));
  }

  private void onSendTurnOff() {
    MessageConsumer<String> consumer = eb.consumer("service.on_off.company_wechat_msg");

    consumer.handler(payload -> {
      TurnOnOffMsg msg = JSONUtil.toBean(payload.body(), new TypeReference<>() {
      }, true);

      log.info("企业消息执行开关操作 : {}", msg);
      pushOnOff = msg.getStatus();
    });
  }
}
